/********************************************************************
	File :			Dispatcher.cpp
	Creation date :	2010/6/27
		
	License :			Copyright 2010 Ahmed Charfeddine, http://www.pushframework.com

				   Licensed under the Apache License, Version 2.0 (the "License");
				   you may not use this file except in compliance with the License.
				   You may obtain a copy of the License at
				
					   http://www.apache.org/licenses/LICENSE-2.0
				
				   Unless required by applicable law or agreed to in writing, software
				   distributed under the License is distributed on an "AS IS" BASIS,
				   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
				   See the License for the specific language governing permissions and
				   limitations under the License.
	
	
*********************************************************************/
#include "StdAfx.h"
#include "Dispatcher.h"

#include "ScopedLock.h"
#include "Channel.h"
#include "ServerImpl.h"
#include "ServerStats.h"
#include "../include/Protocol.h"
#include "../include/OutgoingPacket.h"
#include "../include/IncomingPacket.h"
#include "../include/ClientFactory.h"
#include "../include/Client.h"
#include "../include/Service.h"
#include "../include/BroadcastManager.h"
#include "../include/Server.h"
#include "ChannelFactory.h"
#include "ClientImpl.h"
#include "ClientFactoryImpl.h"

#include "BroadcastManagerImpl.h"

#include "MonitorsBroadcastManager.h"

#include "BroadcastChannel.h"
#include "MonitorProtocol.h"
#include "MonitorRequestPacket.h"
#include "MonitorResponsePacket.h"

#include "StopWatch.h"
#include "Utilities.h"

namespace PushFramework{


Dispatcher::Dispatcher(ServerImpl* pServerImpl)
{
	pMonitorsBroadcastManager = new MonitorsBroadcastManager(pServerImpl->getChannelFactory());

	this->pServerImpl = pServerImpl;
	::InitializeCriticalSection(&csSrvMap);




}

Dispatcher::~Dispatcher(void)
{
	::DeleteCriticalSection(&csSrvMap);

	delete pMonitorsBroadcastManager;
}

std::string Dispatcher::getServiceNames()
{
	std::stringstream ss;
	ss << std::noskipws;

	for (serviceMapT::iterator it = serviceMap.begin();
		it!=serviceMap.end();
		it++)
	{
		ss << "<request value=\"" << it->second->serviceName << "\"/>";
	}

	return ss.str();
}

void Dispatcher::setCurrentService( std::string serviceName )
{
	ScopedLock lock(csSrvMap);
	pthread_t dwThread = pthread_self();

	workerServiceMap[dwThread] = serviceName;
}

void Dispatcher::UnsetCurrentService()
{
	ScopedLock lock(csSrvMap);
	pthread_t dwThread = pthread_self();

	workerServiceMapT::iterator it = workerServiceMap.find(dwThread);
	if (it!=workerServiceMap.end())
		workerServiceMap.erase(it);
}

bool Dispatcher::getCurrentService( std::string& serviceName )
{
	ScopedLock lock(csSrvMap);
	pthread_t dwThread = pthread_self();

	workerServiceMapT::iterator it = workerServiceMap.find(dwThread);
	if (it==workerServiceMap.end())
		return false;

	serviceName = it->second;
	return true;
}


void Dispatcher::NotifyObserversClientIN( const char* key, std::string peerIP, unsigned int peerPort )
{
	std::string timestamp  = Utilities::getCurrentTime();

	// = "22:00::23";

	std::stringstream ss;

	ss << std::noskipws;

	//Write header :
	ss << "<root type=\"cin\">";
	ss << "<time value=\"" << timestamp << "\"/>";
	ss << "<name value=\"" << key << "\"/>";
	ss << "<ip value=\"" << peerIP << "\"/>";
	ss << "<port value=\"" << peerPort << "\"/>";
	ss << "</root>";


	std::string data = ss.str();

	OutgoingPacket* pPacket =  new MonitorResponsePacket(data);

	pMonitorsBroadcastManager->pushPacket(pPacket, "clientsIn", key, 0);


	
}
void Dispatcher::NotifyObserversClientOut( const char* key )
{
	// = "22:00::23";

	std::stringstream ss;

	ss << std::noskipws;

	//Write header :
	ss << "<root type=\"cout\">";
	ss << "<name value=\"" << key << "\"/>";
	ss << "</root>";

	std::string data = ss.str();
	OutgoingPacket* pPacket =  new MonitorResponsePacket(data);

	//Signal that client is out :
	pMonitorsBroadcastManager->pushPacket(pPacket, "clientsOut");

	//Remove client from the other broadcast group :
	pMonitorsBroadcastManager->removePacket(key, 0, "clientsIn");
}



void Dispatcher::OnReadyToWrite( CChannel* pChannel )
{
	pServerImpl->getChannelFactory()->getObject(pChannel->getKey());


	bool bIsBufferIdle;
	int status = pChannel->OnSend(bIsBufferIdle);

	if (status == CChannel::Attached && bIsBufferIdle){
		ProcessClientPendingPackets(pChannel);
	}

	pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
}

void Dispatcher::OnReadyToRead( CChannel* pChannel )
{
	//wcout << L"Cycle begin." << std::endl;
	//
	StopWatch watch();
	
	pServerImpl->getChannelFactory()->getObject(pChannel->getKey());

        int dwIoSize;

        int status = pChannel->ReadReceivedBytes(dwIoSize);

        if(status < CChannel::Connected)
	{
		pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
		//PostReceive is not called.

		//wcout << L"Cycle aborted" << std::endl;
		return;
	}

        bool isClient = !pChannel->isObserverChannel();

	//Channel is either connected or attached.

	if (dwIoSize == 0)//Peer wants to close the connection.
	{
		pChannel->Close(false);
		if (status == CChannel::Attached)
		{
			if (isClient)
			{
				std::string clientKey = pChannel->getClient();
				pServerImpl->getClientFactory()->onClientDisconnected(clientKey.c_str());
			}
			else
			{
				//TODO notify observer is out.
			}
		}
		pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());

		//wcout << L"Cycle aborted : " << std::endl;

		return;
	}


        

	

	if (isClient)
	{
		pServerImpl->getStats()->addToCumul(ServerStats::BandwidthInbound, dwIoSize);
		pServerImpl->getStats()->addToKeyedDuration(ServerStats::BandwidthInboundPerConnection, pChannel->getKey(), dwIoSize);
	}
	
	//wcout << L"Retrieved channel : " << watch.GetElapsedTime(false) << std::endl;


	

	//wcout << L"Received bytes : " << watch.GetElapsedTime(false) << std::endl;


	

	//The Processing Loop.
	int uCommandID;
	IncomingPacket* pPacket;
	int iResult;
	unsigned int uExtractedBytes;
	Protocol* pProtocol = pChannel->getProtocol();
	DataBuffer& sourceBuffer = pChannel->GetReceiveBuffer();

	bool bProcessDataInQueue = true;
	while (bProcessDataInQueue)
	{
		//watch.GetElapsedTime(false);
		iResult = pProtocol->tryDeserializeIncomingPacket(sourceBuffer, pPacket, uCommandID, uExtractedBytes);
		//iResult = pProtocol->tryDeframeIncomingPacket(pChannel->GetReceiveBuffer().GetBuffer(), pChannel->GetReceiveBuffer().GetDataSize(),uCommandID, pPacket, uExtractedBytes);
		//wcout << L"Packet deframed : " << watch.GetElapsedTime(false) << std::endl;
		if (iResult == Protocol::Success)
		{
			pChannel->GetReceiveBuffer().Pop(uExtractedBytes);
			if (status == CChannel::Attached)
			{
				if(isClient)
					dispatchRequest(uCommandID, pChannel->getClient().c_str(), *pPacket, uExtractedBytes);
				else
					HandleMonitorRequest(pChannel, *pPacket);
			}
			else if(status == CChannel::Connected)
			{
				if(isClient)
					ProcessFirstPacket(pChannel, uCommandID, *pPacket,uExtractedBytes);
				else
					ProcessMonitorFirstPacket(pChannel, *pPacket);
			}
			else{
				//Status changed by another thread eg ::disconnect.
				bProcessDataInQueue = false;
			}
			pProtocol->disposeIncomingPacket(pPacket);
		}
		else if (iResult == Protocol::eDecodingFailure)
		{
			pChannel->GetReceiveBuffer().Pop(uExtractedBytes);
		}
		else
			break;
	}
	//
	if (iResult == Protocol::eIncompletePacket)
	{
		pChannel->PostReceive();
	}
	pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());


		/*
		double d = watch.GetElapsedTime();
				wcout << L"Cycle duration : " << d << std::endl << std::endl << std::endl;*/
		
		
}

void Dispatcher::OnInitializeReady( CChannel* pChannel )
{
	//hold a reference..
	pServerImpl->getChannelFactory()->getObject(pChannel->getKey());

	bool isClient = !pChannel->isObserverChannel();

	if (isClient)
	{
		//
		void* lpContext = NULL;
		OutgoingPacket* pPacket = NULL;
		pPacket = pServerImpl->getClientFactory()->onNewConnection(lpContext);
		if (pPacket)
		{
			pChannel->saveContext(lpContext);
			pChannel->PushPacket(pPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);
		}
	}

	
	//
	pChannel->PostReceive();
	//
	pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
}





void Dispatcher::OnStartGC()
{
	pServerImpl->getChannelFactory()->closeNonLogged();
}

void Dispatcher::OnStartProfiling()
{
	OutgoingPacket* pPacket = pServerImpl->getStats()->getPerformancePacket();

	//push to broadcast group
	pMonitorsBroadcastManager->pushPacket(pPacket, "stats");

	/*
	pServerImpl->getChannelFactory()->BroadcastToMonitors(pPacket);
		pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);*/
	
}

void Dispatcher::registerService( unsigned int uCommmand, Service* pService, std::string serviceName )
{
	ServiceInfo* pServiceInfo = new ServiceInfo;

	pServiceInfo->pService = pService;
	pServiceInfo->serviceName = serviceName;

	serviceMap[uCommmand] = pServiceInfo;
}

void Dispatcher::dispatchRequest( unsigned int uCommand,const char* uClient,IncomingPacket& packet,unsigned int serviceBytes )
{
	//StopWatch dispatchWatch(m_QPFrequency);

	serviceMapT::iterator it = serviceMap.find(uCommand);
	if(it == serviceMap.end())
		return;
	//		
	Service* pHandler = it->second->pService;

	//wcout << L"Locating Service : " << dispatchWatch.GetElapsedTime(false) << std::endl;

	//Mark dispatched service :

	setCurrentService(it->second->serviceName);

	StopWatch watch;
	pHandler->handle(uClient, &packet);

	
	double duration = watch.GetElapsedTime();
/*	wcout << L"Service Time : " << watch.GetElapsedTime() << std::endl;
*/


	//StopWatch statsClock(m_QPFrequency);
	pServerImpl->getStats()->addToDistribution(ServerStats::PerformanceProcessingTimePerService, it->second->serviceName, duration);
	//wcout << L"Stat 1 : " << statsClock.GetElapsedTime(false) << std::endl;

	pServerImpl->getStats()->addToDuration(ServerStats::PerformanceProcessingTime, duration);
	//wcout << L"Stat 2 : " << statsClock.GetElapsedTime(false) << std::endl;

	UnsetCurrentService();

	//Stats. :
	
	pServerImpl->getStats()->addToDistribution(ServerStats::BandwidthInboundVolPerRequest, it->second->serviceName, serviceBytes);
	//wcout << L"Stat 3 : " << statsClock.GetElapsedTime(false) << std::endl;
	

	pServerImpl->getStats()->addToDistribution(ServerStats::PerformanceRequestVolPerRequest, it->second->serviceName, 1);
	//wcout << L"Stat 4 : " << statsClock.GetElapsedTime(false) << std::endl;


	//wcout << L"Dispatch Time : " << dispatchWatch.GetElapsedTime() << std::endl;
}

void Dispatcher::ProcessFirstPacket( CChannel* pChannel,unsigned int uCommand, IncomingPacket& packet,unsigned int serviceBytes )
{
	OutgoingPacket* pOutPacket = NULL;
	//
	Client* pClient;
	int iResult = pServerImpl->getClientFactory()->onFirstRequest(packet, pChannel->getContext(), pClient, pOutPacket);

	//packet and lpContext are not good.
	if (iResult==ClientFactory::RefuseAndClose)
	{
		pChannel->Close(false);
		return;
	}
	//
	if (iResult == ClientFactory::RefuseRequest)
	{
		if (pOutPacket)
		{
			pChannel->PushPacket(pOutPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pOutPacket);
		}
		return;
	}

	//Init the Impl :
	pClient->pImpl->init();

	//iResult >= CClientFactory::CreateClient

	pClient->pImpl->setServerImpl(pServerImpl);

	std::string clientKey = pClient->getKey();
	unsigned int channelKey = pChannel->getKey();

	pChannel->attachToClient(clientKey.c_str()); //status is attached.
	pClient->pImpl->setChannel(channelKey);

	pServerImpl->getStats()->addToCumul(ServerStats::VisitorsHitsIn, 1);


	Client* pExistingClient = pServerImpl->getClientFactory()->pImpl->tryAddClient(pClient);
	if (pExistingClient)
	{
		//Delete created instance.
		pServerImpl->getClientFactory()->disposeClient(pClient);

		//Close past channel.
		unsigned int existingChannel = pExistingClient->pImpl->getChannel();
		CChannel* pPastChannel = pServerImpl->getChannelFactory()->getObject(existingChannel);
		if (pPastChannel)
		{
			pPastChannel->Close(false);
			pServerImpl->getChannelFactory()->disposeObject(existingChannel);
		}

		//Attach client to new channel :
		pExistingClient->pImpl->setChannel(channelKey);

		//Release reference :
		pServerImpl->getClientFactory()->returnClient(clientKey.c_str());

		//Fire Reconnect event.
		pServerImpl->getClientFactory()->onClientReconnected(clientKey.c_str());
		return;
	}

	//Fire connect event.
	pServerImpl->getClientFactory()->onClientConnected(clientKey.c_str());

	//Statistics :
	NotifyObserversClientIN(clientKey.c_str(), pChannel->getPeerIP(), pChannel->getPeerPort());
	pServerImpl->getStats()->addToCumul(ServerStats::VisitorsOnline, 1);


	if (iResult==ClientFactory::CreateAndRouteRequest)
	{
		dispatchRequest(uCommand, pClient->getKey(), packet, serviceBytes);
	}
}

void Dispatcher::HandleMonitorRequest( CChannel* pChannel, IncomingPacket& packet )
{
	MonitorRequestPacket& requestPacket = (MonitorRequestPacket&) packet;

	std::string command = requestPacket.getArgumentAsText("command");

	if (command == "disconnect")
	{
		pChannel->Close(false);
		return;
	}
	if (command=="about")
	{
		std::stringstream ss;
		ss << std::noskipws;

		std::string str = pServerImpl->getServerInfos() + "\nBased on PushFramework version 1.0";

		ss << "<root type=\"console\" value=\""<< str <<"\"/>";
		MonitorResponsePacket response(ss.str());
		pChannel->PushPacket(&response);
		return;
	}
	if (command == "profiling enable")
	{
		if(pServerImpl->getProfilingStatus()==true)
		{
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling is already enabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);
		}
		else
		{
			pServerImpl->enableProfiling(-1);
			//
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling was enabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);
		}
		return;
	}
	if (command == "profiling disable")
	{
		if(pServerImpl->getProfilingStatus()==false)
		{
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling is already disabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);
		}
		else
		{
			pServerImpl->disableProfiling();
			//
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling was disabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);

		}
		return;
	}
	if (command == "profiling status")
	{
		std::stringstream ss;
		ss << std::noskipws;
		if(pServerImpl->getProfilingStatus()==false)
			ss << "<root type=\"console\" value=\"Profiling was found to be disabled.\"/>";
		else
			ss << "<root type=\"console\" value=\"Profiling was found to be enabled.\"/>";
		MonitorResponsePacket response(ss.str());
		pChannel->PushPacket(&response);
	}

	char pOut[256];

	bool bRet = pServerImpl->getFacade()->handleMonitorRequest(command.c_str(), pOut);
	
	if (bRet)
	{
		//
		std::stringstream ss;
		ss << std::noskipws;
		ss << "<root type=\"console\" value=\"" << pOut << L"\"/>";
		MonitorResponsePacket response(ss.str());
		pChannel->PushPacket(&response);
	}

}

void Dispatcher::ProcessMonitorFirstPacket( CChannel* pChannel, IncomingPacket& packet )
{
	MonitorRequestPacket& requestPacket = (MonitorRequestPacket&) packet;

	//
	std::string accessKey = requestPacket.getArgumentAsText("accessKey");
	

	Version ver;
	pServerImpl->getVersion(ver);

	unsigned int monitorProtocol = ver.monitorProtocolVer;//requestPacket.getArgumentAsInt("protocol");

	if (accessKey == pServerImpl->getMonitorPassword())
	{
		if (ver.monitorProtocolVer != monitorProtocol)
		{
			MonitorResponsePacket* pPacket = new MonitorResponsePacket("<root type=\"pref\"/>");
			pChannel->PushPacket(pPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);
			pChannel->Close(true);
		}
		else
		{
			pChannel->attachToClient("");

			OutgoingPacket* pInitPacket = pServerImpl->getStats()->getInitializationPacket();
			pChannel->PushPacket(pInitPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pInitPacket);
		}

	}
	else{
		//Reply 
		MonitorResponsePacket* pRefusePwdResponse = new MonitorResponsePacket("<root type=\"cref\"/>");
		pChannel->PushPacket(pRefusePwdResponse);
		pServerImpl->getFacade()->disposeOutgoingPacket(pRefusePwdResponse);
	}
}


void Dispatcher::ProcessClientPendingPackets( CChannel* pChannel )
{
	/*
OutgoingPacket* _pPacket = pServerImpl->getFacade()->getTestPacket();
	pChannel->SendData(_pPacket);
	delete _pPacket;


	return;
*/
	
	//Get subscriber key :
	std::string subscriberKey;
	if(pChannel->isObserverChannel())
		subscriberKey = Utilities::ConvertInteger(pChannel->getKey());
	else
		subscriberKey = pChannel->getClient();


	// Get broadcast manager :
	BroadcastManagerImplBase* pBroadcastMgr = NULL;
	if (pChannel->isObserverChannel())
		pBroadcastMgr  = pMonitorsBroadcastManager;
	else
		pBroadcastMgr  = pServerImpl->getBroadcastManager()->pImpl;


	std::string broadcastGroup;
	BroadcastChannel::PacketHANDLE hPacket;

	OutgoingPacket* pPacket = pBroadcastMgr->getNextPacket(subscriberKey.c_str(), hPacket, broadcastGroup);
	if(!pPacket)
		return;

	bool bSuccess = pChannel->PushPacket(pPacket);
	pBroadcastMgr->disposePacket(hPacket, broadcastGroup, subscriberKey, bSuccess);	
}





}

